Kubeflow Pipelines
ここでは Kubeflow Pipelines 一般についてまとめる
Notebookx や Profile や Dashboard や Serving 的なものはしらない、パイプラインのとこだけ使ってる
事前情報
DSL がしんどくて特におすすめはしない
Python コードとしては valid でも実際に動かそうとするとうまくいかなくてハマることが多い
Type Hint を入出力の型に使っていて、型は合っていてもジェネリクスがダメとか実行時は違うとかでハマる
list[str] と書くと実行できなくて list と書くとか、DSL 解釈内部の気持ちを推測して書く最悪の体験
Python を実行しているつもりでいてはいけない、Python コードによって生成されたパイプラインが実行されている
コンポーネントごとに実行時に GPU 付けて終わったら捨てる等ができる、サーバーレスに近い体験、そして安い
世界観
リモートで動かすものである、ローカルのファイル渡したり読んだりはできない
単純な値なら pipeline 引数に渡す / そうでないなら Cloud Storage に置いて参照する
事前に DAG (有効非巡回グラフ) を構築して動かすものである
動的にフローが変わったり、実行するステップ数が変わるようなものは向いてないorできない
kfp.v2.dsl は 1 時代の移行用、そもそも 2 以降を使ってたら kfp.dsl でいい
ランタイムは実質 Vertex AI Pipeline 用? OSS のバックエンドで動かない機能がちりばめられててそれでいいのか
コンポーネントとパイプライン
パイプラインはコンポーネントを呼び出してつないでいく
パイプライン自体をコンポーネントとして別のパイプラインから呼べる
値を返す時もコンポーネントと同じ
コンポーネントは3種
1関数に閉じて(Hermetic に)実装(import は全部関数内で書く)
packages_to_isntall=[] で依存を指定して実行時にインストール
base_image 省略した時は python:3.7 で激古なので実質常に設定することになる
base_image は実はなんでもいい、python3 と pip が入っている(python3 -m ensurepip) のが条件
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main ... にオプションやコードがテキストで渡って実行される、コンパイル結果見たら色々わかる
base_image, target_image を指定し kpf compnent build でビルド
kfp component build src/ --component-filepattern my_component.py --push-image
実行時に使うのは target_image、ファイル単位で埋め込むの関数外に import 書いてもよい
依存関係の固定にはこちらがよいが、イメージ更新 & 実行 の 2 ステップが要るので小さいうちはやらないかな
イメージを自作する状況なら大抵他にも色々入れることになり kfp component build では足りなくなり中途半端
生成される Dockerfile を書き換えて運用したりしてるの?
これ使わず別のビルドパイプラインで Docker イメージを作って、Lightweight か Container にしがち
任意のコンテナを実行する、実装内容は自由になる
コマンド引数に渡せるものしか入出力できない、簡単な入出力以外は全てファイルパス渡す形になる
機能が限定されすぎている Artifact やら Metadata 書くには後続にもう1コンポーネント作る羽目になる
書いたコードがそのまま実行されてるわけじゃない、これは DSL だ!!!
何度繰り返してもいい概念
DSL メモ
コンポーネント入出力の型は常に書く、呼ぶ時も常に名前付き引数で呼ぶ、実行時に参照される
@dsl.component デコレータのついた関数を呼んだ返り値は PipelineTask
hoge コンポーネントを呼んだものの命名は hoge_task や hoge_op が多い
hoge コンポーネントを作って呼んで hoge_task に束縛し出力を hoge_task.output で次に渡す
型
コンポーネントやパイプラインの基底は kfp.dsl.base_component.BaseComponent
kfp.dsl.base_component.PipelineTask
値の受け渡しは kfp.dsl.pipeline_channel.PipelineChannel を通って行われる
task.output は PipelineChannel、 task.outputs は Mapping[str, PipelineChannel]
foo_task.set_caching_options(False) コンポーネント単位でキャッシュオフ
foo_task.set_env_variable("FOO", "1,2,3") のように環境変数セットできる、
コンポーネントへの入出力
コンポーネントの Type Hint で入出力を表現している、型チェック時だけのおまけ扱いじゃない
DSL として使えるのは
基本的な型
primitive なスカラ型 str, int, float, bool と コレクション dict か list
dsl.Input[], dsl.Output[] コンテナ型
Input[Dataset] のように型パラメータは dsl.Artifact (とそのサブクラス)を取る
dsl.InputPath(), dsl.OutputPath()
Input, Output の path だけ受け渡す形式
Python の実行不要な ContainerComponent でコンテナの実行引数に渡して読み込ませる or 書き込ませる用途
dsl.Artifact とそのサブクラス
dsl.Dataset, dsl.Model
dsl.Metrics, dsl.ClassificationMetrics, dsl.Classificationmetrics
dsl.HTML, dsl.Markdown
注意 dict[str, int] や list[str] 等のジェネリクス型は扱えない
いや型パラメータ読み飛ばしてくれよ...と思うけどダメ、知らないと普通に型付けてめちゃハマる
output, input は型パラメータのものに変換可能
Output[Artifact], OutputPath(dsl.Artifact) の output は別のステップの Input[Artifact] や Artifact 引数に渡せる、型パラメータ & 引数の対応を見ている
Output の2つの書き方
コンポーネント引数で Output を宣言する (Traditional artifact syntax)
Output, OutputPath はシグネチャで宣言だけして自分では渡さない、実行時に渡されるものに値を埋める
引数に受け取ったオブジェクトに値をセットする、オブジェクトの持つパスに内容を書き込む
code:input_output.py
@dsl.component
def training_component(dataset: InputDataset, model: OutputModel): with open(dataset.path) as f:
contents = f.read()
# ... train tf_model model on contents of dataset ...
tf_model.save(model.path)
tf_model.metadata = { ... }
関数の返り値型に書く & 値を返す (Pythonic artifact syntax)
dsl.Output のラップは不要で dsl.Artifact を返せばよい
Container Component には対応してない(Python コードをそのまま実行しないので)
code:output.py
@dls.component
def my_component(source_uri: str) -> dsl.Dataset:
...
out = dsl.Dataset(
uri=dsl.get_uri(suffix="foo.json"),
metadata={"model": model_name, "source_uri": source_uri},
)
with open(out.path, 'w') as f:
f.write(...)
return out
多値を返す時は NamedTuple 使う
入出力は pipeline_root の Cloud Storage バケットに記録される
DSL 記述上に型書いてるだけで、実行時は違う
@component については呼び出し側が待って値を解決して呼ぶイメージ
それ以外は Channel や Placeholder が渡ってくる
イメージ図
url をコンテナで fetch して Dataset にして返すパイプライン
https://gyazo.com/21d583cae9bffc1e881f5d1dd5914558
複数の Output
Hermetic に書く都合上、関数のシグネチャと値作るところで2回書くことになる
サンプルコードは mypy がうまく扱えないのでダルい https://gyazo.com/af35e5e39141423083833518a4324b0c
代替案
これなら type: ignore 1個で済む & 関数内での型チェックは働く(シグネチャとはだめ)
code:namedtupleoutput.py
@dsl.component
def my_comp1() -> NamedTuple("Outputs", a=int, b=str): # type: ignore
from collections import namedtuple
Outputs = namedtuple("Outputs", "a", "b") return Outputs(1, "hello")
# あるいは
@dsl.component
def my_comp2() -> NamedTuple("Outputs", a=int, b=str): # type: ignore
return Outputs(1, "hello")
Output & OutputPath は Input, Artifact に変換可能
OutputPath に書き出したものを次のステップに渡すには InputPath でないといけないわけではない
output: OutputPath(Artifact) に値を設定する時は output はパスの文字列だけど、
それを別のコンポーネントに渡す時は InputPath(Artifact) にも Input[Artifact] にも Artifact にも渡せる
code:output.py
@dsl.component
with open(output.path, "w") as f:
f.write("traditional")
@dsl.component
def pythonic_output() -> dsl.Artifact:
output = dsl.Artifact(uri=dsl.get_uri())
with open(output.path, "w") as f:
f.write("pythonic")
@dsl.component
def output_path(output: dsl.OutputPath(dsl.Artifact)): # type: ignore
with open(output, "w") as f:
f.write("output path")
@dsl.component
def count_file(input: dsl.Artifact) -> int:
with open(input.path, "r") as f:
return len("".join(f.readlines()))
@dsl.pipeline
def pipeline():
pythonic_output_task = pythonic_output()
count_file(input=pythonic_output_task.output)
traditional_output_task = traditional_output()
count_file(input=traditional_output_task.output)
output_path_task = output_path()
count_file(input=output_path_task.output)
Artifact だけでなく OutputPath(str) や Output(str) を str を取るコンポーネントに渡せる
そして OutputhPath(int) に数値文字列(ファイルに書くので)を書いて int を引数に取るコンポーネントにも渡せる
ただし InputPath() に変換できるのは Artifact 系の出力だけっぽい
まあ InputPath(int) や InputPath(str) が必要な状況は無くて int や strで受ければいいのだが
条件分岐
code:quote_if.py
@dsl.pipeline
def my_pipeline():
coin_flip_task = flip_three_sided_coin()
with dsl.If(coin_flip_task.output == 'heads'):
print_comp(text='Got heads!')
with dsl.Elif(coin_flip_task.output == 'tails'):
print_comp(text='Got tails!')
with dsl.Else():
print_comp(text='Draw!')
あんまつかってない、条件内で output を参照できる
TODO 両辺で output いけるかみる
ループ
for で書く
パイプラインとしてはつながらないが複数のコンポーネントを呼ぶことはできる
コンパイル時に for して DAG を作っていることを忘れないこと
sleep したりしてもコンパイル遅いだけで意味ないです
dsl.ParallelFor
並列度指定できる
code:quote_parallel.py
@dsl.pipeline
def my_pipeline():
with dsl.ParallelFor(
parallelism=2
) as epochs:
train_model(epochs=epochs)
list[dict] に対してのループの注意
code:example.py
with dsl.ParallelFor(
parallelism=1
) as item:
task1 = my_component(..., number=item.a)
task2 = my_component(..., number=item.b)
のように items に list[dict] を渡せるように見えるが実際は単体の値型しか渡せない
{"a":1, "b":2} や {"a":"1", "b":"2"} は通るが {"a":1, "b":"2"} のようなものはダメ
pipeline parameters を使う方法についてコメントしておいた
dict はネストできない
ループの各値は LoopParameterArgument で、Not Subscriptable である(d['key'] できない)
item.a のような参照は LoopArgumentVariable が返っているがこれの子要素を辿る方法はない?
上の例で item.a.foo みたいな参照はエラーになる
なのでフラットなオブジェクトを PararellFor に渡す(大抵呼び出す側の Pipeline にも)必要がある
dict に name が入っているとおかしくなる、name というキー名を避けよう
code:name.py
with dsl.ParallelFor(items=items) as item:
task1 = my_component(..., name=item.name) # name="items-loop-item" になってる
のような呼び出しをすると name が別の値(ループの名前)に置き換わっている
items の変数名が採用される? この例だと items-loop-item
値の参照を __getattr__ でやっているから?
LoopParameterArgument は PipelineParameterChannel を継承していて name 持っている
最悪
中の実装見て使ってない名前を選ぶ必要があり、発狂する
眺めるに task, task_name, items, full_name, pattern channel_type とかもダメだろう
name をキーに持つ dataset_defs にループを回してメタデータめちゃくちゃになったね
https://gyazo.com/c4af35880372ea58e4c39047a92e8b1d
壁殴る代わりに Issue 立てたい
Lightweight Python Component で import する
独自にビルドしたイメージをbase_image に設定して使える、Python コードを COPY していれば import もできる
component 実装内で sys.path.append("/app/src") などしてパスを通す
パイプライン側で comp1_task = comp1(...).set_env_variable("PYTHONPATH", "/app/src") でも良い?
ローカルでは set_env_variable の動作が確認できないので試してない
Container Component の出力
OutputPath を使うしかない
ディレクトリ自分で作らないといけない、いやそれは作ってよ
コンテナ側で書き込み先パスのディレクトリが無い想定で mkdir -p 相当をするように作ってあるとよい
ここでは args と $0, $1 でやっているが、f-string で文字列展開してもよい
返り値を参照する時は curl_task.outputs['content'] のように引数名の dict になっている
code:container_component.py
@dsl.container_component
def curl(url: str, content: dsl.OutputPath(str)): # type: ignore
return dsl.ContainerSpec(
image="curlimages/curl:8.6.0",
)
コンポーネントへの入力型を信用するな
一見型チェック通ってても関係ない諦めろ
@dsl.pipeline への入力
kfp.dsl.pipeline_channel.PipelineParameterChannel
@dsl.container_component への入力
kfp.dsl.placeholders.InputValuePlaceholder
@dsl.component は素直?
code:input_types.py
@dsl.container_component
def fetch_url(url: str, content: dsl.OutputPath(str)): # type: ignore
print(type(url)) # kfp.dsl.placeholders.InputValuePlaceholder
return dsl.ContainerSpec(
image="curlimages/curl:8.6.0",
)
@dsl.component
def save_as_artifact(content: str, metadata: dict = {}) -> dsl.Dataset:
print(type(content)) # これは str
out = dsl.Dataset(uri=dsl.get_uri(suffix="content.html"), metadata=metadata)
with open(out.path, "w") as f:
f.write(content)
return out
@dsl.pipeline
def download_pipeline(url: str) -> dsl.Dataset:
print(type(url)) # kfp.dsl.pipeline_channel.PipelineParameterChannel
fetch_task = fetch_url(url=url)
return save_as_artifact(
# metadata={"url": url},
# こう渡せば行けそうに見えるが、PipelineParameterChannel の値を待って解決する人がいないのでダメ
).output
@dsl.pipeline
def pipeline() -> None:
https://gyazo.com/21d583cae9bffc1e881f5d1dd5914558
Importer
外部の Artifact を取り込むやつ、取り込むことで ML Metadata に記録される
実行環境でアクセスできる URI を渡す形だが、Metadata の記録が不要なら単に URI 文字列を渡せばよいので、
モデルへの入力を残したいモチベーションがある時に使う
ローカル実行
code:local.py
from kfp import dsl, local
local.init(runner=local.DockerRunner())
...
if __name__ == "__main__":
pipeline() # 単に実装した pipeline を呼び出す
動作確認やコンパイルの確認に便利だが、色々対応してないのでまあコンパイル通るだけよりちょっとマシなぐらい
set_env_variable 使えない?
dsl.Collected is not yet supported by the KFP orchestration backend, but may be supported by other orchestration backends.
other orchestration backends て何? Vertex AI Pipeline 以外に使えるのあんの?
dsl.Artifact(uri=dsl.get_uri(suffix="out.txt")) のように suffix 付きで作った Artifact を読めない
{pipeline_root}/.../Output を決め打ちで読んでいる? こういうのがいっちゃんダルい
メタデータ変数
dsl.PIPELINE_JOB_NAME_PLACEHOLDER
パイプライン名-yyyyMMddHHmmss
dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER
projects/{project_number}/locations/{location}/pipeineJobs/PIPELINE_JOB_NAME
dsl.PIPELINE_JOB_ID_PLACEHOLDER
数値の ID
dsl.PIPELINE_TASK_NAME_PLACEHOLDER
実行 Component 名
dsl.PIPELINE_TASK_ID_PLACEHOLDER
コンポーネントID? コンポーネントごとの出力に使われる
dsl.PIPELINE_JOB_CREATE_TIME_UTC_PLACEHOLDER
UTC 時刻
dsl.PIPELINE_JOB_SCHEDULE_TIME_UTC_PLACEHOLDER
epoch 0、スケジュール実行なら値が入る
dsl.PIPELINE_ROOT_PLACEHOLDER
pipeline_root
pipeline 内で参照できる、component で見るには渡さないといけない
PIPELINE_JOB_NAME_PLACEHOLDER は {{$.pipeline_job_name}} のような値で pipeline で解決される
component で直接参照してもダメ
ループで component を呼び出す時、PIPELINE_TASK_ID_PLACEHOLDER など実行の時に採番されてちゃんと別々の値になる